{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "# Word Count\n",
    "\n",
    "Counting the number of occurances of words in a text is a popular first exercise using map-reduce."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "## The Task\n",
    "**Input:** A text file consisisting of words separated by spaces.  \n",
    "**Output:** A list of words and their counts, sorted from the most to the least common.\n",
    "\n",
    "We will use the book \"Moby Dick\" as our input."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [],
   "source": [
    "#start the SparkContext\n",
    "from pyspark import SparkContext\n",
    "sc=SparkContext(master=\"local[4]\")\n",
    "\n",
    "# set import path\n",
    "import sys\n",
    "sys.path.append('../../Utils/')\n",
    "#from plan2table import plan2table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def pretty_print_plan(rdd):\n",
    "    for x in rdd.toDebugString().decode().split('\\n'):\n",
    "        print(x)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "## Download data file from S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "ename": "ConnectionError",
     "evalue": "HTTPSConnectionPool(host='mas-dse-open.s3.amazonaws.com', port=443): Max retries exceeded with url: /Moby-Dick.txt (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f932d1b9ba8>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mgaierror\u001b[0m                                  Traceback (most recent call last)",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/connection.py\u001b[0m in \u001b[0;36m_new_conn\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m    140\u001b[0m             conn = connection.create_connection(\n\u001b[0;32m--> 141\u001b[0;31m                 (self.host, self.port), self.timeout, **extra_kw)\n\u001b[0m\u001b[1;32m    142\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/util/connection.py\u001b[0m in \u001b[0;36mcreate_connection\u001b[0;34m(address, timeout, source_address, socket_options)\u001b[0m\n\u001b[1;32m     59\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 60\u001b[0;31m     \u001b[0;32mfor\u001b[0m \u001b[0mres\u001b[0m \u001b[0;32min\u001b[0m \u001b[0msocket\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgetaddrinfo\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mhost\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfamily\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msocket\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mSOCK_STREAM\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     61\u001b[0m         \u001b[0maf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msocktype\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mproto\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcanonname\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msa\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mres\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/socket.py\u001b[0m in \u001b[0;36mgetaddrinfo\u001b[0;34m(host, port, family, type, proto, flags)\u001b[0m\n\u001b[1;32m    744\u001b[0m     \u001b[0maddrlist\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 745\u001b[0;31m     \u001b[0;32mfor\u001b[0m \u001b[0mres\u001b[0m \u001b[0;32min\u001b[0m \u001b[0m_socket\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgetaddrinfo\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mhost\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfamily\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mproto\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mflags\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    746\u001b[0m         \u001b[0maf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msocktype\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mproto\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcanonname\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0msa\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mres\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mgaierror\u001b[0m: [Errno -3] Temporary failure in name resolution",
      "\nDuring handling of the above exception, another exception occurred:\n",
      "\u001b[0;31mNewConnectionError\u001b[0m                        Traceback (most recent call last)",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/connectionpool.py\u001b[0m in \u001b[0;36murlopen\u001b[0;34m(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)\u001b[0m\n\u001b[1;32m    600\u001b[0m                                                   \u001b[0mbody\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mbody\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mheaders\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mheaders\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 601\u001b[0;31m                                                   chunked=chunked)\n\u001b[0m\u001b[1;32m    602\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/connectionpool.py\u001b[0m in \u001b[0;36m_make_request\u001b[0;34m(self, conn, method, url, timeout, chunked, **httplib_request_kw)\u001b[0m\n\u001b[1;32m    345\u001b[0m         \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 346\u001b[0;31m             \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_validate_conn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconn\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    347\u001b[0m         \u001b[0;32mexcept\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0mSocketTimeout\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mBaseSSLError\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/connectionpool.py\u001b[0m in \u001b[0;36m_validate_conn\u001b[0;34m(self, conn)\u001b[0m\n\u001b[1;32m    849\u001b[0m         \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mgetattr\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mconn\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m'sock'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m  \u001b[0;31m# AppEngine might not have  `.sock`\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 850\u001b[0;31m             \u001b[0mconn\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mconnect\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    851\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/connection.py\u001b[0m in \u001b[0;36mconnect\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m    283\u001b[0m         \u001b[0;31m# Add certificate verification\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 284\u001b[0;31m         \u001b[0mconn\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_new_conn\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    285\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/connection.py\u001b[0m in \u001b[0;36m_new_conn\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m    149\u001b[0m             raise NewConnectionError(\n\u001b[0;32m--> 150\u001b[0;31m                 self, \"Failed to establish a new connection: %s\" % e)\n\u001b[0m\u001b[1;32m    151\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mNewConnectionError\u001b[0m: <urllib3.connection.VerifiedHTTPSConnection object at 0x7f932d1b9ba8>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution",
      "\nDuring handling of the above exception, another exception occurred:\n",
      "\u001b[0;31mMaxRetryError\u001b[0m                             Traceback (most recent call last)",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/requests/adapters.py\u001b[0m in \u001b[0;36msend\u001b[0;34m(self, request, stream, timeout, verify, cert, proxies)\u001b[0m\n\u001b[1;32m    439\u001b[0m                     \u001b[0mretries\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmax_retries\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 440\u001b[0;31m                     \u001b[0mtimeout\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    441\u001b[0m                 )\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/connectionpool.py\u001b[0m in \u001b[0;36murlopen\u001b[0;34m(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw)\u001b[0m\n\u001b[1;32m    638\u001b[0m             retries = retries.increment(method, url, error=e, _pool=self,\n\u001b[0;32m--> 639\u001b[0;31m                                         _stacktrace=sys.exc_info()[2])\n\u001b[0m\u001b[1;32m    640\u001b[0m             \u001b[0mretries\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msleep\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/urllib3/util/retry.py\u001b[0m in \u001b[0;36mincrement\u001b[0;34m(self, method, url, response, error, _pool, _stacktrace)\u001b[0m\n\u001b[1;32m    387\u001b[0m         \u001b[0;32mif\u001b[0m \u001b[0mnew_retry\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mis_exhausted\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 388\u001b[0;31m             \u001b[0;32mraise\u001b[0m \u001b[0mMaxRetryError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_pool\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0murl\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0merror\u001b[0m \u001b[0;32mor\u001b[0m \u001b[0mResponseError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcause\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    389\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mMaxRetryError\u001b[0m: HTTPSConnectionPool(host='mas-dse-open.s3.amazonaws.com', port=443): Max retries exceeded with url: /Moby-Dick.txt (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f932d1b9ba8>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))",
      "\nDuring handling of the above exception, another exception occurred:\n",
      "\u001b[0;31mConnectionError\u001b[0m                           Traceback (most recent call last)",
      "\u001b[0;32m<timed exec>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/requests/api.py\u001b[0m in \u001b[0;36mget\u001b[0;34m(url, params, **kwargs)\u001b[0m\n\u001b[1;32m     70\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     71\u001b[0m     \u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msetdefault\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'allow_redirects'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 72\u001b[0;31m     \u001b[0;32mreturn\u001b[0m \u001b[0mrequest\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'get'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0murl\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mparams\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mparams\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     73\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     74\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/requests/api.py\u001b[0m in \u001b[0;36mrequest\u001b[0;34m(method, url, **kwargs)\u001b[0m\n\u001b[1;32m     56\u001b[0m     \u001b[0;31m# cases, and look like a memory leak in others.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     57\u001b[0m     \u001b[0;32mwith\u001b[0m \u001b[0msessions\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mSession\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0msession\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 58\u001b[0;31m         \u001b[0;32mreturn\u001b[0m \u001b[0msession\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrequest\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmethod\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mmethod\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0murl\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0murl\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     59\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     60\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/requests/sessions.py\u001b[0m in \u001b[0;36mrequest\u001b[0;34m(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)\u001b[0m\n\u001b[1;32m    506\u001b[0m         }\n\u001b[1;32m    507\u001b[0m         \u001b[0msend_kwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mupdate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msettings\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 508\u001b[0;31m         \u001b[0mresp\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mprep\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0msend_kwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    509\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    510\u001b[0m         \u001b[0;32mreturn\u001b[0m \u001b[0mresp\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/requests/sessions.py\u001b[0m in \u001b[0;36msend\u001b[0;34m(self, request, **kwargs)\u001b[0m\n\u001b[1;32m    616\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    617\u001b[0m         \u001b[0;31m# Send the request\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 618\u001b[0;31m         \u001b[0mr\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0madapter\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mrequest\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    619\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    620\u001b[0m         \u001b[0;31m# Total elapsed time of the request (approximately)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/opt/conda/lib/python3.6/site-packages/requests/adapters.py\u001b[0m in \u001b[0;36msend\u001b[0;34m(self, request, stream, timeout, verify, cert, proxies)\u001b[0m\n\u001b[1;32m    506\u001b[0m                 \u001b[0;32mraise\u001b[0m \u001b[0mSSLError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0me\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mrequest\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mrequest\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    507\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 508\u001b[0;31m             \u001b[0;32mraise\u001b[0m \u001b[0mConnectionError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0me\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mrequest\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mrequest\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    509\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    510\u001b[0m         \u001b[0;32mexcept\u001b[0m \u001b[0mClosedPoolError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mConnectionError\u001b[0m: HTTPSConnectionPool(host='mas-dse-open.s3.amazonaws.com', port=443): Max retries exceeded with url: /Moby-Dick.txt (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f932d1b9ba8>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "##If this cell fails, download the file from https://mas-dse-open.s3.amazonaws.com/Moby-Dick.txt on your browser\n",
    "# and put it in the '../../Data/ directory\n",
    "import requests\n",
    "data_dir='../../Data'\n",
    "filename='Moby-Dick.txt'\n",
    "url = \"https://mas-dse-open.s3.amazonaws.com/\"+filename\n",
    "local_path = data_dir+'/'+filename\n",
    "!mkdir -p {data_dir}\n",
    "# Copy URL content to local_path\n",
    "r = requests.get(url, allow_redirects=True)\n",
    "open(local_path, 'wb').write(r.content)\n",
    "\n",
    "# check that the text file is where we expect it to be\n",
    "!ls -l $local_path"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Define an RDD that will read the file\n",
    "* Execution of read is **lazy**\n",
    "* File has been opened.\n",
    "* Reading starts when stage is executed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 0 ns, sys: 0 ns, total: 0 ns\n",
      "Wall time: 1.6 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "text_file = sc.textFile(data_dir+'/'+filename)\n",
    "type(text_file) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "## Steps for counting the words\n",
    "\n",
    "* split line by spaces.\n",
    "* map `word` to `(word,1)`\n",
    "* count the number of occurances of each word."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 20 ms, sys: 10 ms, total: 30 ms\n",
      "Wall time: 318 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "words =     text_file.flatMap(lambda line: line.split(\" \"))\n",
    "not_empty = words.filter(lambda x: x!='') \n",
    "key_values= not_empty.map(lambda word: (word, 1)) \n",
    "counts=     key_values.reduceByKey(lambda a, b: a + b)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "### flatMap()\n",
    "Note the line:\n",
    "```python\n",
    "words =     text_file.flatMap(lambda line: line.split(\" \"))\n",
    "```\n",
    "Why are we using `flatMap`, rather than `map`?\n",
    "\n",
    "The reason is that the operation `line.split(\" \")` generates a **list** of strings, so had we used `map` the result would be an RDD of lists of words. Not an RDD of words.\n",
    "\n",
    "The difference between `map` and `flatMap` is that the second expects to get a list as the result from the map and it **concatenates** the lists to form the RDD."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "## The execution plan\n",
    "In the last cell we defined the execution plan, but we have not started to execute it.\n",
    "\n",
    "* Preparing the plan took ~100ms, which is a non-trivial amount of time, \n",
    "* But much less than the time it will take to execute it.\n",
    "* Lets have a look a the execution plan."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "### Understanding the details\n",
    "To see which step in the plan corresponds to which RDD we print out the execution plan for each of the RDDs.  \n",
    "\n",
    "Note that the execution plan for `words`, `not_empty` and `key_values` are all the same."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(2) ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      " |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n"
     ]
    }
   ],
   "source": [
    "pretty_print_plan(text_file)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(2) PythonRDD[9] at RDD at PythonRDD.scala:48 []\n",
      " |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      " |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n"
     ]
    }
   ],
   "source": [
    "pretty_print_plan(words)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(2) PythonRDD[8] at RDD at PythonRDD.scala:48 []\n",
      " |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      " |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n"
     ]
    }
   ],
   "source": [
    "pretty_print_plan(not_empty)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(2) PythonRDD[7] at RDD at PythonRDD.scala:48 []\n",
      " |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      " |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n"
     ]
    }
   ],
   "source": [
    "pretty_print_plan(key_values)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {
    "collapsed": false,
    "scrolled": true,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(2) PythonRDD[23] at RDD at PythonRDD.scala:48 []\n",
      " |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:122 []\n",
      " |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []\n",
      " +-(2) PairwiseRDD[20] at reduceByKey at <timed exec>:1 []\n",
      "    |  PythonRDD[19] at reduceByKey at <timed exec>:1 []\n",
      "    |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      "    |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n"
     ]
    }
   ],
   "source": [
    "pretty_print_plan(counts)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "\n",
    "| Execution plan   | RDD |  Comments |\n",
    "| :---------------------------------------------------------------- | :------------: | :--- |\n",
    "|`(2)_PythonRDD[6] at RDD at PythonRDD.scala:48 []`| **counts** | Final RDD|\n",
    "|`_/__MapPartitionsRDD[5] at mapPartitions at PythonRDD.scala:436 []`| **---\"---** |\n",
    "|`_/__ShuffledRDD[4] at partitionBy at NativeMethodAccessorImpl.java:0 [`| **---\"---** | RDD is partitioned by key |\n",
    "|`_+-(2)_PairwiseRDD[3] at reduceByKey at <timed exec>:4 []`| **---\"---** | Perform mapByKey |\n",
    "|`____/__PythonRDD[2] at reduceByKey at <timed exec>:4 []`| **words, not_empty, key_values** | The result of  partitioning into words|\n",
    "| | |  removing empties, and making into (word,1) pairs|\n",
    "|`____/__../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at Nat`| **text_file** | The partitioned text |\n",
    "|`____/__../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMeth`| **---\"---** | The text source |"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Execution\n",
    "Finally we count the number of times each word has occured.\n",
    "Now, finally, the Lazy execution model finally performs some actual work, which takes a significant amount of time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Different words=33781, total words=215133, mean no. occurances per word=6.37\n",
      "CPU times: user 10.5 ms, sys: 7.39 ms, total: 17.9 ms\n",
      "Wall time: 1.04 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "## Run #1\n",
    "Count=counts.count()  # Count = the number of different words\n",
    "Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y) # \n",
    "print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "### Amortization\n",
    "When the same commands are performed repeatedly on the same data, the execution time tends to decrease in later executions.\n",
    "\n",
    "The cells below are identical to the one above, with one exception at `Run #3`\n",
    "\n",
    "Observe that `Run #2` take much less time that `Run #1`. Even though no `cache()` was explicitly requested. The reason is that Spark caches (or materializes) `key_values`, before executing `reduceByKey()` because performng reduceByKey requires a shuffle, and a shuffle requires that the input RDD is materialized. In other words, sometime caching happens even if the programmer did not ask for it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false,
    "scrolled": true,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Different words=33781, total words=215133, mean no. occurances per word=6.37\n",
      "CPU times: user 20 ms, sys: 10 ms, total: 30 ms\n",
      "Wall time: 3.9 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "## Run #2\n",
    "Count=counts.count()\n",
    "Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)\n",
    "print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "#### Explicit Caching\n",
    "In `Run #3` we explicitly ask for `counts` to be cached. This will reduce the execution time in the following run by a little bit, but not by much."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Different words=33781, total words=215133, mean no. occurances per word=6.37\n",
      "CPU times: user 20 ms, sys: 0 ns, total: 20 ms\n",
      "Wall time: 597 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "## Run #3, cache\n",
    "Count=counts.cache().count()\n",
    "Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)\n",
    "print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Different words=33781, total words=215133, mean no. occurances per word=6.37\n",
      "CPU times: user 20 ms, sys: 10 ms, total: 30 ms\n",
      "Wall time: 432 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "#Run #4\n",
    "Count=counts.count()\n",
    "Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)\n",
    "print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Different words=33781, total words=215133, mean no. occurances per word=6.37\n",
      "CPU times: user 20 ms, sys: 0 ns, total: 20 ms\n",
      "Wall time: 307 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "#Run #5\n",
    "Count=counts.count()\n",
    "Sum=counts.map(lambda x:x[1]).reduce(lambda x,y:x+y)\n",
    "print('Different words=%5.0f, total words=%6.0f, mean no. occurances per word=%4.2f'%(Count,Sum,float(Sum)/Count))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Summary\n",
    "This was our first real pyspark program, hurray!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "#### Some things you learned:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "1) An RDD is a distributed immutable array.  \n",
    "It is the core data structure of Spark is an RDD. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "2)  You cannot operate on an RDD directly. Only through **Transformations** and **Actions**."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "3) **Transformations** transform an RDD into another RDD."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "4) **Actions** output their results on the head node."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "5) After the action is done, you are using just the head node, not the workers."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "#### Lazy Execution"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "1) RDD operations are added to an **Execution Plan**."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "2) The plan is executed when a result is needed."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "3) Explicit and implicit caching cause internediate results to be saved."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "**Next:** Finding the most common words."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "# Finding the most common words\n",
    "* `counts`: RDD with 33301 pairs of the form `(word,count)`. \n",
    "* Find the 5 most frequent words. \n",
    "* **Method1:** `collect` and sort on head node.\n",
    "* **Method2:** Pure Spark, `collect` only at the end."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "## **Method1:** `collect` and sort on head node \n",
    "### Collect the RDD into the driver node\n",
    "* Collect can take significant time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 50 ms, sys: 0 ns, total: 50 ms\n",
      "Wall time: 200 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "C=counts.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Sort \n",
    "* RDD collected into list in driver node.\n",
    "* No longer using spark parallelism.\n",
    "* Sort in python\n",
    "* will not scale to very large documents."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "code_folding": [],
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "most common words\n",
      "the:\t13766\n",
      "of:\t6587\n",
      "and:\t5951\n",
      "a:\t4533\n",
      "to:\t4510\n",
      "CPU times: user 10 ms, sys: 0 ns, total: 10 ms\n",
      "Wall time: 18.1 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "C.sort(key=lambda x:x[1])\n",
    "print('most common words\\n'+'\\n'.join(['%s:\\t%d'%c for c in reversed(C[-5:])]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "### Compute the mean number of occurances per word."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "count2=33781.000000, sum2=215133.000000, mean2=6.368462\n"
     ]
    }
   ],
   "source": [
    "Count2=len(C)\n",
    "Sum2=sum([i for w,i in C])\n",
    "print('count2=%f, sum2=%f, mean2=%f'%(Count2,Sum2,float(Sum2)/Count2))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## **Method2:** Pure Spark, `collect` only at the end.\n",
    "* Collect into the head node only the more frquent words.\n",
    "* Requires multiple **stages**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Step 1 split, clean and map to `(word,1)`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 0 ns, sys: 0 ns, total: 0 ns\n",
      "Wall time: 66.3 µs\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "word_pairs=text_file.flatMap(lambda x: x.split(' '))\\\n",
    "    .filter(lambda x: x!='')\\\n",
    "    .map(lambda word: (word,1))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### **Step 2** Count occurances of each word."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 10 ms, sys: 0 ns, total: 10 ms\n",
      "Wall time: 37.5 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "counts=word_pairs.reduceByKey(lambda x,y:x+y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### **Step 3** Reverse `(word,count)` to `(count,word)` and sort by key"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 30 ms, sys: 10 ms, total: 40 ms\n",
      "Wall time: 1.49 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "reverse_counts=counts.map(lambda x:(x[1],x[0]))   # reverse order of word and count\n",
    "sorted_counts=reverse_counts.sortByKey(ascending=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### Full execution plan\n",
    "\n",
    "We now have a complete plan to compute the most common words in the text. Nothing has been executed yet! Not even a single byte has been read from the file `Moby-Dick.txt` !\n",
    "\n",
    "For more on execution plans and lineage see [jace Klaskowski's blog](https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-lineage.html#toDebugString)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {
    "collapsed": false,
    "scrolled": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "word_pairs:\n",
      "(2) PythonRDD[18] at RDD at PythonRDD.scala:48 []\n",
      " |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      " |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      "\n",
      "counts:\n",
      "(2) PythonRDD[23] at RDD at PythonRDD.scala:48 []\n",
      " |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:122 []\n",
      " |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []\n",
      " +-(2) PairwiseRDD[20] at reduceByKey at <timed exec>:1 []\n",
      "    |  PythonRDD[19] at reduceByKey at <timed exec>:1 []\n",
      "    |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      "    |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      "\n",
      "reverse_counts:\n",
      "(2) PythonRDD[30] at RDD at PythonRDD.scala:48 []\n",
      " |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:122 []\n",
      " |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []\n",
      " +-(2) PairwiseRDD[20] at reduceByKey at <timed exec>:1 []\n",
      "    |  PythonRDD[19] at reduceByKey at <timed exec>:1 []\n",
      "    |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      "    |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      "\n",
      "sorted_counts:\n",
      "(2) PythonRDD[31] at RDD at PythonRDD.scala:48 []\n",
      " |  MapPartitionsRDD[29] at mapPartitions at PythonRDD.scala:122 []\n",
      " |  ShuffledRDD[28] at partitionBy at NativeMethodAccessorImpl.java:0 []\n",
      " +-(2) PairwiseRDD[27] at sortByKey at <timed exec>:2 []\n",
      "    |  PythonRDD[26] at sortByKey at <timed exec>:2 []\n",
      "    |  MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:122 []\n",
      "    |  ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []\n",
      "    +-(2) PairwiseRDD[20] at reduceByKey at <timed exec>:1 []\n",
      "       |  PythonRDD[19] at reduceByKey at <timed exec>:1 []\n",
      "       |  ../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n",
      "       |  ../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []\n"
     ]
    }
   ],
   "source": [
    "print('word_pairs:')\n",
    "pretty_print_plan(word_pairs)\n",
    "print('\\ncounts:')\n",
    "pretty_print_plan(counts)\n",
    "print('\\nreverse_counts:')\n",
    "pretty_print_plan(reverse_counts)\n",
    "print('\\nsorted_counts:')\n",
    "pretty_print_plan(sorted_counts)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "sorted_counts:\n",
    "\n",
    "| Execution plan   | RDD |\n",
    "| :---------------------------------------------------------------- | :------------: |\n",
    "|`(2)_PythonRDD[20] at RDD at PythonRDD.scala:48 []`| **sorted_counts** |\n",
    "|`_/__MapPartitionsRDD[19] at mapPartitions at PythonRDD.scala:436 []`| **---\"---** |\n",
    "|`_/__ShuffledRDD[18] at partitionBy at NativeMethodAccessorImpl.java:0 `| **---\"---** |\n",
    "|`_+-(2)_PairwiseRDD[17] at sortByKey at <timed exec>:2 []`| **---\"---** |\n",
    "|`____/__PythonRDD[16] at sortByKey at <timed exec>:2 []`| ** counts, reverse_counts** |\n",
    "|`____/__MapPartitionsRDD[13] at mapPartitions at PythonRDD.scala:436 []`| **---\"---** |\n",
    "|`____/__ShuffledRDD[12] at partitionBy at NativeMethodAccessorImpl.java`| **---\"---** |\n",
    "|`____+-(2)_PairwiseRDD[11] at reduceByKey at <timed exec>:1 []`| **---\"---** |\n",
    "|`_______/__PythonRDD[10] at reduceByKey at <timed exec>:1 []`| **word_pairs** |\n",
    "|`_______/__../../Data/Moby-Dick.txt MapPartitionsRDD[1] at textFile at `| **---\"---** |\n",
    "|`_______/__../../Data/Moby-Dick.txt HadoopRDD[0] at textFile at NativeM`| **---\"---** |"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### **Step 4** Take the top 5 words"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "most common words\n",
      "13766:\tthe\n",
      "6587:\tof\n",
      "5951:\tand\n",
      "4533:\ta\n",
      "4510:\tto\n",
      "CPU times: user 10 ms, sys: 0 ns, total: 10 ms\n",
      "Wall time: 398 ms\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "D=sorted_counts.take(5)\n",
    "print('most common words\\n'+'\\n'.join(['%d:\\t%s'%c for c in D]))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Summary\n",
    "We showed two ways for finding the most common words:\n",
    "1. Collecting and sorting at the head node. -- Does not scale.\n",
    "2. Using RDDs to the end.\n",
    "\n",
    "See you next time!"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "celltoolbar": "Slideshow",
  "hide_input": false,
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {
    "height": "349px",
    "width": "252px"
   },
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": "block",
   "toc_window_display": true
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
